import os
import random
import subprocess
import socket
import time
import threading

# 1.运行环境
#   pycharm
#   Python 3.10
#   测试框架 pytest (pycharm setting -> Tools -> Testing -> pytest)

# 2.配置测试脚本
#   需配置hdc环境变量-NORMAL_HEAD为可执行程序hdc
#   配置TEST_FILE_PATH为测试用文件所在路径,取hdc.log  log  100M  entry-default-signed-debug.hap放入该路径下
#   测试tcp模式连接设备，需设备，PC连同一网络(手机热点)：配置TCP_CFG['ip']为设备的ip


TCP_CFG = {
    'ip': '',
    'port': "8710",
}

NORMAL_HEAD = "hdc "
REMOTE_PATH = "/data/local/tmp/"
IP = ""
TEST_FILE_PATH = os.path.abspath("D:/Download/sdk/") + '\\'

HAP_ONE = {
    'HAP_NAME': "entry-default-signed-debug.hap",
    'PACKAGE_NAME': "com.hmos.diagnosis",
}

TCP_CONN = {
    'bright_screen': "shell \"power-shell setmode 602\"",
    'tmode': "tmode port " + TCP_CFG['port'],
    'tconn': "tconn " + TCP_CFG['ip'] + ":" + TCP_CFG['port'],
}

PATH = {
    'file_send': {
        'local': TEST_FILE_PATH + "hdc.log",
        'remote': REMOTE_PATH + "hdc.log"
    },
    'dir_send': {
        'local': TEST_FILE_PATH + "log",
        'remote': REMOTE_PATH + "log"
    },
    'file_recv': {
        'remote': REMOTE_PATH + "log",
        'local': TEST_FILE_PATH + "dev_data"
    },
    'dir_recv': {
        'remote': REMOTE_PATH + "log",
        'local': TEST_FILE_PATH + "hdc\\log"
    },
    'file_empty': {
        'local': TEST_FILE_PATH + "empty.txt",
        'remote': REMOTE_PATH + "empty.txt"
    }
}

EXTRA_COMMANDS = {
    'global': ["kill -r", "kill", "-l5 start", "start -r", "-v", "version", "checkserver"],
    'smode': ["smode -r", "smode"],
    'boot': ["target boot"],
    'choose': "-t "
}

BASIC_COMMANDS = {
    'shell': [
        "shell \"ls\""
    ],
    'component': [
        "list targets", "list targets -v", "target mount"
    ],
    'file_task': [
        "file send " + PATH['file_send']['local'] + " " + PATH['file_send']['remote'],
        "file send " + PATH['dir_send']['local'] + " " + PATH['dir_send']['remote'],
        "file recv " + PATH['file_recv']['remote'] + " " + PATH['file_recv']['local'],
        "file recv " + PATH['dir_recv']['remote'] + " " + PATH['dir_recv']['local']
    ],
    'fport_task': [
        "fport tcp:1234 tcp:1080",
        "rport tcp:13608 localabstract:8888BananaBanana",
        "fport ls",
        "fport rm tcp:1234 tcp:1080"
    ],
    'install_task': [
        "install " + TEST_FILE_PATH + HAP_ONE['HAP_NAME'],
        "uninstall " + HAP_ONE['PACKAGE_NAME']
    ]
}

TEST_FILES = {
    'one': {
        'send_file': TEST_FILE_PATH + "100M.txt",
        'send_file_one': REMOTE_PATH + "a100M.txt",
        'send_file_two': REMOTE_PATH + "c100M.txt",
        'recv_file': REMOTE_PATH + "recv100M.txt",
        'recv_file_one': TEST_FILE_PATH + "recv100M.txt",
        'recv_file_two': TEST_FILE_PATH + "recv200M.txt",
    },
    'two': {
        'send_file': TEST_FILE_PATH + "hdc_file.log",
        'send_file_one': REMOTE_PATH + "send_one.log",
        'send_file_two': REMOTE_PATH + "send_two.log",
        'recv_file': REMOTE_PATH + "hdcd.log",
        'recv_file_one': TEST_FILE_PATH + "recv_one.log",
        'recv_file_two': TEST_FILE_PATH + "recv_two.log",
    }
}

def command_judge(cmd):
    ret = False
    cmd_parts = cmd.split()
    if 'file send' in cmd and cmd[:9] == 'file send' and len(cmd_parts) == 4:
        ret = True
    if 'file recv' in cmd and cmd[:9] == 'file recv' and len(cmd_parts) == 4:
        ret = True
    if 'install' in cmd and cmd[:7] == 'install' and len(cmd_parts) == 2:
        ret = True
    return ret


def command_callback(cmd, head, need_del, res=""):
    cmd_parts = cmd.split()
    if 'file send' in cmd and cmd[:9] == 'file send' and len(cmd_parts) == 4:
        if need_del:
            assert "FileTransfer finish" in res
        check_file_send(cmd_parts[2], cmd_parts[3], head, need_del)
    if 'file recv' in cmd and cmd[:9] == 'file recv' and len(cmd_parts) == 4:
        if need_del:
            assert "FileTransfer finish" in res
        check_file_recv(cmd_parts[2], cmd_parts[3], head, need_del)
    if 'install' in cmd and cmd[:7] == 'install' and len(cmd_parts) == 2:
        check_install(head, res)
    if cmd == 'smode':
        time.sleep(4)
        check_root(head)
    if cmd == 'smode -r':
        time.sleep(4)
        check_user(head)
    if cmd == 'target boot':
        time.sleep(35)


def check_file_send(local_file, remote_file, head, need_del):
    ret = get_win_file_type(local_file)
    file_type = ""
    if 'file' in ret:
        file_type = '-f'
    if "dir" in ret:
        file_type = '-d'
    res = run_command_stdout("shell \"[ " + file_type + " " + remote_file + " ] && echo yes || echo no\"", head)
    assert 'yes' in res
    if file_type == '-d':
        rm_send_file(remote_file, head, need_del)
        return
    local_md5 = get_win_md5(local_file)
    remote_md5 = get_md5(remote_file, head)
    rm_send_file(remote_file, head, need_del)
    assert local_md5 == remote_md5
    print("check_file_send success ", res)


def rm_send_file(file_path, head, need_del):
    if need_del:
        run_command("shell \"rm -rf " + file_path + "\"", head)


def check_file_recv(remote_file, local_file, head, need_del):
    if 'dir' in get_win_file_type(local_file):
        rm_recv_file(local_file, need_del)
        return
    res = run_command_stdout("attrib " + local_file, "")
    local_md5 = get_win_md5(local_file)
    remote_md5 = get_md5(remote_file, head)
    assert '-' not in res
    if local_md5 != remote_md5:
        print("check_file_recv fail ", remote_file, local_file)
    assert local_md5 == remote_md5
    rm_recv_file(local_file, need_del)
    print("check_file_recv success ", res)


def get_win_file_type(file_path):
    ret = run_command_stdout("if exist " + file_path + " echo yes", '')
    assert "yes" in ret
    res = run_command_stdout("dir/ad " + file_path + " >nul 2>nul && echo dir || echo file", '')
    return res


def rm_recv_file(file_path, need_del):
    if need_del:
        res = get_win_file_type(file_path)
        if "dir" in res:
            run_command("rmdir /s/q " + file_path, "")
        if "file" in res:
            run_command("del " + file_path, "")


def check_install(head, res):
    print("check_install")
    print(res)
    if "msg:install bundle successfully." not in res:
        print("install msg error")
        assert False
    res = run_command_stdout("shell \"bm dump -a\"", head)
    if HAP_ONE['PACKAGE_NAME'] in res:
        print("check_install success ", HAP_ONE['PACKAGE_NAME'])
        assert True
    else:
        assert False


def check_root(head):
    res = run_command_stdout("shell \"whoami\"", head)
    print("check_root res: ", res)
    assert 'root' in res


def check_user(head):
    res = run_command_stdout("shell \"whoami\"", head)
    print("check_user res: ", res)
    assert 'shell' in res
    return 'shell' in res


def get_devs(head=NORMAL_HEAD):
    res = run_command_stdout(BASIC_COMMANDS['component'][0], head)
    devs = res.split()
    print(devs)
    return devs


def tmode_to_tcp():
    run_command(TCP_CONN['tmode'])
    res = run_command_stdout(TCP_CONN['tconn'])
    print(res)
    if "Connect OK" in res:
        return True
    return False


def remote_server_start(server_head):
    global IP
    cmd = server_head + "-m"
    print(cmd)
    os.popen(cmd)


def run_command(cmd, head=NORMAL_HEAD, need_del=True, need_callback=True):
    command = head + cmd
    if head != '':
        print(command)

    subprocess.Popen(command,
                     shell=True).communicate()
    if need_callback:
        command_callback(cmd, head, need_del)


def run_command_stdout(cmd, head=NORMAL_HEAD, need_del=True, need_callback=True):
    command = head + cmd
    if head != '' and 'echo' not in cmd:
        print(command)
    dec = "UTF-8"
    if head == '':
        dec = 'gbk'
    res = subprocess.Popen(command,
                           shell=True,
                           stdout=subprocess.PIPE).communicate()
    res = res[0].decode(dec)
    if need_callback:
        command_callback(cmd, head, need_del, res)
    return res


def run_commands(cmds, head=NORMAL_HEAD, need_del=True):
    for command in cmds:
        run_command(command, head, need_del)


def get_basic_commands():
    commands = []
    for tasks in BASIC_COMMANDS.values():
        commands += tasks
    return commands


def run_global_cmd():
    global_commands = EXTRA_COMMANDS['global']
    run_commands(global_commands)


def run_split_commands(commands, head=NORMAL_HEAD):
    for command in commands:
        if command_judge(command):
            run_command_stdout(command, head, False)
        else:
            run_command(command, head, False)


def run_device_cmd(head=NORMAL_HEAD):
    run_command_stdout("file send " + PATH['dir_send']['local'] + " " + PATH['dir_recv']['remote'], head, False)
    for smd in EXTRA_COMMANDS['smode']:
        run_command(smd)
        commands = get_basic_commands() + EXTRA_COMMANDS['boot']
        if smd == "smode -r":
            for item in commands:
                if REMOTE_PATH + "log" in item:
                    print(item)
                    commands.remove(item)
        run_split_commands(commands, head)


def extract_ip():
    global IP
    st = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    try:
        st.connect(('10.255.255.255', 1))
        IP = st.getsockname()[0]
    except Exception:
        IP = ""
    finally:
        st.close()
    return IP


def mix_path(path, i):
    ret = path.find('.')
    if ret > 0:
        return path[:ret] + i + path[ret:]
    return path


def file_send(send_path, recv_path, i, wait_time=0):
    time.sleep(wait_time)
    res = run_command_stdout("file send " + send_path + " " + mix_path(recv_path, str(i)))
    print(res)


def file_recv(remote_path, recv_path, i, wait_time=0):
    time.sleep(wait_time)
    res = run_command_stdout("file recv " + remote_path + " " + mix_path(recv_path, str(i)))
    print(res)


def get_win_md5(file_path):
    real_md5 = "win_md5"
    send_md5 = run_command_stdout("certutil -hashfile " + os.path.abspath(file_path) + " MD5",
                                  "").split()
    for x in send_md5:
        if len(x) == 32:
            real_md5 = x
    return real_md5


def get_md5(file_path, head=NORMAL_HEAD):
    md5 = run_command_stdout("shell \"md5sum " + file_path + "\"", head)
    return md5.split()[0]


class TestCommands:
    def test_file_cmd(self):
        print("HDC TEST: start test_file_cmd\n")
        for item in TEST_FILES:
            send_file = TEST_FILES[item]['send_file']
            send_file_one = TEST_FILES[item]['send_file_one']
            send_file_two = TEST_FILES[item]['send_file_two']

            recv_file = TEST_FILES[item]['recv_file']
            recv_file_one = TEST_FILES[item]['recv_file_one']
            recv_file_two = TEST_FILES[item]['recv_file_two']

            run_command_stdout("file send " + os.path.abspath(PATH['file_empty']['local']) + ' '
                               + PATH['file_empty']['remote'], NORMAL_HEAD, False, False)
            run_command_stdout("file send " + os.path.abspath(send_file) + ' ' + recv_file, NORMAL_HEAD, False)
            for i in range(10):
                wait_time = random.uniform(0, 1)
                if i == 0:
                    wait_time = 0
                print("HDC TEST: start test_file_cmd \n" + str(i))
                send_one = threading.Thread(target=file_send, args=(os.path.abspath(send_file), send_file_one, i))
                send_two = threading.Thread(target=file_send,
                                            args=(os.path.abspath(send_file), send_file_two, i, wait_time))
                recv_one = threading.Thread(target=file_recv, args=(recv_file, os.path.abspath(recv_file_one), i))
                recv_two = threading.Thread(target=file_recv,
                                            args=(recv_file, os.path.abspath(recv_file_two), i, wait_time))

                send_one.start()
                send_two.start()
                recv_one.start()
                recv_two.start()
                send_one.join()
                send_two.join()
                recv_one.join()
                recv_two.join()

    def test_global_server(self):
        print("HDC TEST: start test_global_server_cmd\n")
        run_global_cmd()

    def test_device_cmd(self):
        print("HDC TEST: start test_device_cmd\n")
        devs = get_devs()
        if len(devs) == 1:
            run_device_cmd()
        if len(devs) > 1:
            for dev in devs:
                run_device_cmd(NORMAL_HEAD + EXTRA_COMMANDS['choose'] + dev + " ")

    def test_tcp_mode(self):
        print("HDC TEST: start test_tcp_mode\n")
        extract_ip()
        global IP
        if len(IP) == 0 or TCP_CFG['ip'] == '':
            print("请连接热点 配置TCP_CFG")
            return
        if tmode_to_tcp():
            commands = get_basic_commands()
            run_commands(commands, NORMAL_HEAD, False)

    def test_remote_server(self):
        time.sleep(10)
        print("HDC TEST: start test_remote_server\n")
        extract_ip()
        ret = run_command_stdout("kill")
        if 'finish' not in ret:
            print('test_remote_server kill server failed')
            return
        global IP
        server_head = "hdc -s " + IP + ":" + "8710 "
        thread_start = threading.Thread(target=remote_server_start(server_head))
        thread_start.start()
        thread_start.join()
        devs = get_devs(server_head)
        for dev in devs:
            head = server_head + EXTRA_COMMANDS['choose'] + dev + " "
            run_command_stdout("file send " + PATH['dir_send']['local'] + " " + PATH['dir_recv']['remote'], head, False)
            threading.Thread(target=run_split_commands(get_basic_commands(), head)).start()
        run_command("kill", server_head)
